Since this is streaming application, we will use python logging module to log. Further read.
In [1]:
import logging # python logging module
# basic format for logging
logFormat = "%(asctime)s - [%(levelname)s] (%(funcName)s:%(lineno)d) %(message)s"
# logs will be stored in tweepy.log
logging.basicConfig(filename='tweepy.log', level=logging.INFO,
format=logFormat, datefmt="%Y-%m-%d %H:%M:%S")
Create an app in twitter here. Copy the necessary keys and access tokens, which will be used here in our code.
The authorization is done using Oauth, An open protocol to allow secure authorization in a simple and standard method from web, mobile and desktop applications. Further read.
We will use Tweepy a python module. Tweepy is open-sourced, hosted on GitHub and enables Python to communicate with Twitter platform and use its API. Tweepy supports oauth authentication. Authentication is handled by the tweepy.AuthHandler class.
In [ ]:
import tweepy # importing all the modules required
import socket # will be used to create sockets
import json # manipulate json
In [ ]:
# Keep these tokens secret, as anyone can have full access to your
# twitter account, using these tokens
consumerKey = "#"
consumerSecret = "#"
accessToken = "#"
accessTokenSecret = "#"
Post this step, we will have full access to twitter api's
In [ ]:
# Performing the authentication and authorization, post this step
# we will have full access to twitter api's
def connectToTwitter():
"""Connect to twitter."""
try:
auth = tweepy.OAuthHandler(consumerKey, consumerSecret)
auth.set_access_token(accessToken, accessTokenSecret)
api = tweepy.API(auth)
logging.info("Successfully logged in to twitter.")
return api, auth
except Exception as e:
logging.info("Something went wrong in oauth, please check your tokens.")
logging.error(e)
The Twitter streaming API is used to download twitter messages in real time. We use streaming api instead of rest api because, the REST api is used to pull data from twitter but the streaming api pushes messages to a persistent session. This allows the streaming api to download more data in real time than could be done using the REST API.
In Tweepy, an instance of tweepy.Stream establishes a streaming session and routes messages to StreamListener instance. The on_data method of a stream listener receives all messages and calls functions according to the message type.
But the on_data method is only a stub, so we need to implement the functionality by subclassing StreamListener.
Using the streaming api has three steps.
In [ ]:
# Tweet listner class which subclasses from tweepy.StreamListener
class TweetListner(tweepy.StreamListener):
"""Twitter stream listner"""
def __init__(self, csocket):
self.clientSocket = csocket
def dataProcessing(self, data):
"""Process the data, before sending to spark streaming
"""
sendData = {} # data that is sent to spark streamer
user = data.get("user", {})
name = user.get("name", "undefined").encode('utf-8')
followersCount = user.get("followers_count", 0)
sendData["name"] = name
sendData["followersCount"] = followersCount
#data_string = "{}:{}".format(name, followersCount)
self.clientSocket.send(json.dumps(sendData) + u"\n") # append new line character, so that spark recognizes it
logging.debug(json.dumps(sendData))
def on_data(self, raw_data):
""" Called when raw data is received from connection.
return False to stop stream and close connection.
"""
try:
data = json.loads(raw_data)
self.dataProcessing(data)
return True
except Exception as e:
logging.error("An unhandled exception has occured, check your data processing")
logging.error(e)
return False
def on_error(self, status_code):
"""Called when a non-200 status code is returned"""
logging.error("A non-200 status code is returned")
return False
def on_exception(self, exception):
"""Called when an unhandled exception occurs."""
logging.error("An unhandled exception has occured")
return
In [ ]:
# Creating a proxy socket
def createProxySocket(host, port):
""" Returns a socket which can be used to connect
to spark.
"""
try:
s = socket.socket() # initialize socket instance
s.bind((host, port)) # bind to the given host and port
s.listen(5) # Enable a server to accept connections.
logging.info("Listening on the port {}".format(port))
cSocket, address = s.accept() # waiting for a connection
logging.info("Received Request from: {}".format(address))
return cSocket
except socket.error as e:
if e.errno == socket.errno.EADDRINUSE: # Address in use
logging.error("The given host:port {}:{} is already in use"\
.format(host, port))
logging.info("Trying on port: {}".format(port + 1))
return createProxySocket(host, port + 1)
The major drawback of the Streaming API is that Twitter’s Steaming API provides only a sample of tweets that are occurring. The actual percentage of total tweets users receive with Twitter’s Streaming API varies heavily based on the criteria users request and the current traffic. Studies have estimated that using Twitter’s Streaming API users can expect to receive anywhere from 1% of the tweets to over 40% of tweets in near real-time. The reason that you do not receive all of the tweets from the Twitter Streaming API is simply because Twitter doesn’t have the current infrastructure to support it, and they don’t want to; hence, the Twitter Firehose. Ref
So we will use a hack i.e. get the top trending topics and use that to filter data.
In [ ]:
def getWOEIDForTrendsAvailable(api, place):
"""Returns the WOEID of the country if the trend is available there. """
# Iterate through trends
data = api.trends_available()
for item in data:
if item["name"] == place: # Use place = "Worldwide" to get woeid of world
woeid = item["woeid"]
break
return woeid #name = India, woeid
In [ ]:
# Get the list of trending topics from twitter
def getTrendingTopics(api, woeid):
"""Get the top trending topics from twitter"""
data = api.trends_place(woeid)
listOfTrendingTopic = [trend["name"] for trend in data[0]["trends"]]
return listOfTrendingTopic
In [ ]:
if __name__ == "__main__":
try:
api, auth = connectToTwitter() # connecting to twitter
# Global information is available by using 1 as the WOEID
# woeid = getWOEIDForTrendsAvailable(api, "Worldwide") # get the woeid of the worldwide
woeid = 1
trendingTopics = getTrendingTopics(api, woeid)
host = "localhost"
port = 8888
cSocket = createProxySocket(host, port) # Creating a socket
tweetStream = tweepy.Stream(auth, TweetListner(cSocket)) # Stream the twitter data
tweetStream.filter(track=trendingTopics) # Filter on trending topics
except KeyboardInterrupt: # Keyboard interrupt called
logging.error("KeyboardInterrupt was hit")
except Exception as e:
logging.error("Unhandled exception has occured")
logging.error(e)